Open
Conversation
…va-questdb-client into jh_experiment_new_ilp
sendPongFrame() used the shared sendBuffer, calling reset() which destroyed any partially-built frame the caller had in progress via getSendBuffer(). This could happen when a PING arrived during receiveFrame()/tryReceiveFrame() while the caller was mid-way through constructing a data frame. Add a dedicated 256-byte controlFrameBuffer for sending pong responses. RFC 6455 limits control frame payloads to 125 bytes plus a 14-byte max header, so 256 bytes is sufficient and never needs to grow. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sendCloseFrame() used reason.length() (UTF-16 code units) to calculate the payload size, but wrote reason.getBytes(UTF_8) (UTF-8 bytes) into the buffer. For non-ASCII close reasons, UTF-8 encoding can be longer than the UTF-16 length, causing writes past the declared payload size. This corrupted the frame header length, the masking range, and could overrun the allocated buffer. Compute the UTF-8 byte array upfront and use its length for all sizing calculations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When receiving a CLOSE frame from the server, the client now echoes a close frame back before marking the connection as no longer upgraded. This is required by RFC 6455 Section 5.5.1. The close code parsing was moved out of the handler-null check so the code is always available for the echo. The echo uses the dedicated controlFrameBuffer to avoid clobbering any in-progress frame in the main send buffer. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle CONTINUATION frames (opcode 0x0) in tryParseFrame() which were previously silently dropped. Fragment payloads are accumulated in a lazily-allocated native memory buffer and delivered as a complete message to the handler when the final FIN=1 frame arrives. The FIN bit is now checked on TEXT/BINARY frames: FIN=0 starts fragment accumulation, FIN=1 delivers immediately. Protocol errors are raised for continuation without an initial fragment and for overlapping fragmented messages. The fragment buffer is freed in close() and the fragmentation state is reset on disconnect(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a configurable maximum size for the WebSocket receive buffer, mirroring the pattern already used by WebSocketSendBuffer. Previously, growRecvBuffer() doubled the buffer without any upper bound, allowing a malicious server to trigger out-of-memory by sending arbitrarily large frames. Add getMaximumResponseBufferSize() to HttpClientConfiguration (defaulting to Integer.MAX_VALUE for backwards compatibility) and enforce the limit in both growRecvBuffer() and appendToFragmentBuffer(), which had the same unbounded growth issue for fragmented messages. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests that expect connection failure were hardcoding ports (9000, 19999) which could collide with running services. When a QuestDB server is running on port 9000, the WebSocket connection succeeds and the test fails with "Expected LineSenderException". Replace hardcoded ports with dynamically allocated ephemeral ports via ServerSocket(0). The port is bound and immediately closed, guaranteeing nothing is listening when the test tries to connect. Affected tests: - testBuilderWithWebSocketTransportCreatesCorrectSenderType - testConnectionRefused - testWsConfigString - testWsConfigString_missingAddr_fails - testWsConfigString_protocolAlreadyConfigured_fails Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Sec-WebSocket-Accept header validation used case-sensitive String.contains(), which violates RFC 7230 (HTTP headers are case-insensitive). A server sending the header in a different casing (e.g., sec-websocket-accept) would cause the handshake to fail. Replace with a containsHeaderValue() helper that uses String.regionMatches(ignoreCase=true) for the header name lookup, avoiding both the case-sensitivity bug and unnecessary string allocation from toLowerCase(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace byte-by-byte native-heap copies in writeToSocket and readFromSocket with Unsafe.copyMemory(), using the 5-argument form that bridges native memory and Java byte arrays via Unsafe.BYTE_OFFSET. Add WebSocketChannelTest with a local echo server that verifies data integrity through the copy paths across various payload sizes and patterns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move maxSentSymbolId and sentSchemaHashes updates to after the send/enqueue succeeds in both async and sync flush paths. Previously these were updated before the send, so if sealAndSwapBuffer() threw (async) or sendBinary()/waitForAck() threw (sync), the next batch's delta dictionary would omit symbols the server never received, silently corrupting subsequent data. Also move sentSchemaHashes.add() inside the messageSize > 0 guard in the sync path, where it was incorrectly marking schemas as sent even when no data was produced. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The validate() range check used TYPE_DECIMAL256 (0x15) as the upper bound, which excluded TYPE_CHAR (0x16). CHAR columns would throw IllegalArgumentException on validation. Extend the upper bound to TYPE_CHAR and add tests covering all valid type codes, nullable CHAR, and invalid type rejection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace raw AssertionError with LineSenderException when a token parameter is provided in ws:: or wss:: configuration strings. The else branch in config string parsing was unreachable when the code only supported HTTP and TCP, but became reachable after WebSocket support was added. Users now get a clear "token is not supported for WebSocket protocol" error instead of a cryptic AssertionError. Add test assertions for both ws:: and wss:: schemas with token. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
If the platform I/O allocation (Kqueue, Epoll, or FDSet) throws after super() succeeds, the parent resources (socket, sendBuffer, controlFrameBuffer, recvBufPtr) were never freed. Wrap each subclass allocation in a try-catch that calls close() on failure, which frees both parent and subclass resources. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the O(N*M) getPendingBytes() scan that iterated all table buffers and their columns on every row commit. Instead, maintain a running pendingBytes total by measuring the delta from the current table buffer before/after nextRow(). Reset the counter at the same three points where pendingRowCount resets. This reduces the per-row cost of shouldAutoFlush() from O(N*M) to O(M) for the single active table. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
NativeSegmentList.ensureCapacity() doubles newCapacity in a loop until it reaches the required size. When newCapacity exceeds Integer.MAX_VALUE / 2, the multiplication overflows to negative, causing the while loop to run infinitely. Guard against this by checking before each doubling and falling back to the exact required capacity when doubling would overflow. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
If the second MicrobatchBuffer allocation (buffer1) throws, the already-allocated buffer0 leaks its native memory. Neither connect() nor createForTesting() catch constructor failures, so the cleanup must happen in the constructor itself. Wrap the buffer1 allocation in a try-catch that closes buffer0 before re-throwing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move NativeSegmentList, NativeBufferWriter, and SegmentedNativeBufferWriter allocations from field initializers into the constructor body, wrapped in a try-catch. If any allocation or the UdpLineChannel/HashMap construction throws, the catch block frees all previously allocated native resources via Misc.free() before re-throwing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Pass CharSequence directly through the symbol and string column paths instead of eagerly converting to String. In stringColumn(), addString() already copies character data immediately via putUtf8(), so the toString() was pure waste. In the symbol path, propagate CharSequence through getOrAddGlobalSymbol() and addSymbolWithGlobalId() down to the dictionary lookup. GlobalSymbolDictionary.getOrAddSymbol() now accepts CharSequence and only calls toString() when a symbol is new and must be stored. CharSequenceIntHashMap.get() already accepts CharSequence for lookups, so repeated symbols incur zero allocation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WebSocketFrameWriter.writeHeader() wrote the mask key via Unsafe.putInt() in native byte order. maskPayload() also extracted mask bytes in native order. RFC 6455 specifies network (big-endian) byte order for the mask key on the wire. writeHeader() now writes the 4 mask key bytes individually in big-endian order. maskPayload() converts to native order for bulk XOR and extracts per-byte mask in big-endian order. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Close the encoder's native memory when MicrobatchBuffer allocation fails in the constructor. Previously only buffer0 was cleaned up, leaving the NativeBufferWriter leaked. In ensureConnected(), close the WebSocket client if WebSocketSendQueue construction fails (e.g. Thread.start() throws OOM), preventing a dangling socket and I/O thread. In flushSync(), fail the in-flight window entry when sendBinary() throws, so close() does not hang on awaitEmpty() waiting for an ACK that will never arrive. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Rename SEGMENT_SIZE constant to ENTRY_SIZE for clarity in NativeSegmentList. Reorder methods alphabetically: move close() and ensureCapacity() before add() to match the project's member-ordering convention. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract encodeUtf8() that writes directly to native memory via Unsafe.putByte on a running address pointer, with a single upfront ensureCapacity call for the full UTF-8 length. Both putUtf8() and putString() now compute utf8Length once, reserve capacity once, then delegate to encodeUtf8(). This also eliminates the double string scan that putString() previously performed (utf8Length + putUtf8's per-char loop with per-byte ensureCapacity). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
With case-sensitive names, client would add the same column twice if the user used the same name in different case.
QwpTableBuffer.createColumn() created new column buffers with size = 0 regardless of how many rows were already committed. When the caller added a value for the current row, it landed at position 0 (the first row's slot). The subsequent nextRow() call padded nulls after the value, misaligning the column data with all other columns. The fix pre-pads the new column with nulls for all committed rows so the next value lands at the correct row position. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When user would add a value to the same column column twice for the current row, client would write that as the value for the next row, breaking the invariant that all cols have the same length. Fix is to align behavior with ILP: first write wins, others silently ignored.
flushSync() wraps the NullPointerException from sendBinary() on a null client in a LineSenderException via its catch (Throwable) block. The test was catching the unwrapped NullPointerException, which never arrived, causing the test to fail with an unhandled LineSenderException. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpWebSocketSender.checkedColumnName() validated every column name on every row write, even though getOrCreateColumn() finds the column already exists in most cases. Move the validation into QwpTableBuffer.getOrCreateColumn() so it runs only when a new column is created. Add getOrCreateDesignatedTimestampColumn() for the designated timestamp, which uses an empty-string sentinel name and must bypass column name validation. Both QwpWebSocketSender and QwpUdpSender now use this dedicated method. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
"Nullable" is a confusing concept due to QuestDB null sentinels, which means a "non-nullable" column could still hold nulls via sentinel values.
this avoid extra work when just global dictionary is needed we need to keep the per-column dictionaries due to UDP. for now.
…xperiment_new_ilp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TODO in run_tests_pipeline.yaml! Change before merging!
Change to
Summary
This PR adds a new WebSocket-based ingestion path to the Java client using QWP (QuestDB Wire Protocol), a binary protocol that replaces text-based ILP for higher-throughput data ingestion. The existing HTTP and TCP ILP senders remain unchanged.
Users select the new transport via
Sender.builder(Transport.WEBSOCKET). The builder accepts WebSocket-specific options such asasyncMode,autoFlushBytes,autoFlushIntervalMillis, andinFlightWindowSize.Architecture
The implementation follows a layered design:
Protocol layer (
cutlass/qwp/protocol/)QwpTableBufferstores rows in columnar format using off-heap memory (zero-GC on the data path).QwpSchemaHashcomputes XXHash64 over column names and types, enabling server-side schema caching. The client sends a full schema on the first batch and a hash reference on subsequent batches if the schema has not changed.QwpGorillaEncoderapplies delta-of-delta compression to timestamp columns.QwpBitWriterhandles bit-level packing for booleans and null bitmaps.QwpConstantsdefines the wire format: "QWP1" magic bytes, type codes, feature flags, status codes.Client layer (
cutlass/qwp/client/)QwpWebSocketSenderimplements theSenderinterface. It uses a double-buffering scheme: the user thread writes rows into an activeMicrobatchBuffer, which is sealed and handed to an I/O thread when an auto-flush trigger fires (row count, byte size, or time interval).QwpWebSocketEncoderserializesQwpTableBuffercontents into binary QWP frames, including delta symbol dictionaries (only new symbols since the last acknowledged batch).InFlightWindowimplements a lock-free sliding window protocol that tracks batches awaiting server ACKs, providing backpressure from the server to the user thread.WebSocketSendQueueruns the dedicated I/O thread, managing frame transmission and ACK/NACK response parsing.GlobalSymbolDictionaryassigns sequential integer IDs to symbol strings and supports delta encoding across batches.WebSocket transport (
cutlass/http/client/,cutlass/qwp/websocket/)WebSocketClientis a zero-GC WebSocket implementation with platform-specific subclasses for Linux (epoll), macOS (kqueue), and Windows (select).WebSocketFrameParserandWebSocketFrameWriterhandle RFC 6455 frame serialization, including fragmentation, close-frame echo, and ping/pong.WebSocketSendBufferbuilds masked WebSocket frames directly in native memory.Bug fixes and robustness improvements
The PR fixes a number of issues found during development and testing:
WebSocketClientconstructor and on allocation failure.sendQueueleak on close when flush fails.WebSocketClient,WebSocketSendBuffer,QwpTableBuffer), array dimension products, andputBlockOfBytes().receiveFrame()throwing instead of returning false, which masked I/O errors as timeouts.cancelRow()truncation.SecureRnd(ChaCha20-based CSPRNG) for WebSocket masking keys instead ofjava.util.Random.Code cleanup
The PR removes ~11,000 lines of dead code:
ConcurrentHashMap(3,791 lines),ConcurrentIntHashMap(3,612 lines),GenericLexer,Base64Helper,LongObjHashMap,FilesFacade, and others.Numbers,Chars,Utf8s,Rnd, andColumnType.ParanoiaState,GeoHashes,BorrowedArray,HttpCookie.CI changes
ClientIntegrationTestsCI stage that starts a QuestDB server and runs the client's integration tests against it (both default and authenticated configurations).sedportability for macOS CI runners.Test plan
QwpSenderTest(8,346 lines) exercises the fullSenderAPI surface for all column types, null handling, cancelRow, schema changes, and error pathsQwpWebSocketSenderTesttests WebSocket-specific sender behavior including async modeQwpWebSocketEncoderTestvalidates binary encoding for all column types and encoding modesLineSenderBuilderWebSocketTestcovers builder validation and configuration for the WebSocket transportassertMemoryLeakwrappers added to client tests to detect native memory leaks